package com.lianda.sql;

import com.alibaba.fastjson.JSON;
import com.lianda.udf.UDFMain;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.io.jdbc.JDBCLookupOptions;
import org.apache.flink.api.java.io.jdbc.JDBCOptions;
import org.apache.flink.api.java.io.jdbc.JDBCTableSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;

/**
 * 合并多个数据源：
 * https://blog.csdn.net/wangpei1949/article/details/103554868
 */
@Slf4j
public class LookupTableSourceMain {
    public static void main(String[] args) throws Exception {
        //1、配置参数；
        String kafkaBootstrapServers = "";
        String browseTopic =  "";
        String browseTopicGroupID =  "";

        String hbaseZookeeperQuorum =  "";
        String hbaseZnode =  "";
        String hbaseTable =  "";

        String mysqlDBUrl =  "";
        String mysqlUser =  "";
        String mysqlPwd =  "";
        String mysqlTable =  "";

        //2、设置运行环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
        env.setParallelism(1);

        //3、注册Kafka数据源
        Properties browseProperties = new Properties();
        browseProperties.put("bootstrap.servers",kafkaBootstrapServers);
        browseProperties.put("group.id",browseTopicGroupID);
        DataStream<UserBrowseLog> browseStream=env
                .addSource(new FlinkKafkaConsumer011<>(browseTopic, new SimpleStringSchema(), browseProperties))
                .process(new BrowseKafkaProcessFunction());
        tableEnv.registerDataStream("kafka",browseStream,"userID,eventTime,eventTimeTimestamp,eventType,productID,productPrice");

        //4、注册Mysql数据源(Lookup Table Source)
        String[] mysqlFieldNames={"pid","productName","productCategory","updatedAt"};
        DataType[] mysqlFieldTypes={DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING()};
        TableSchema mysqlTableSchema = TableSchema.builder().fields(mysqlFieldNames, mysqlFieldTypes).build();
        JDBCOptions jdbcOptions = JDBCOptions.builder()
                .setDriverName("com.mysql.jdbc.Driver")
                .setDBUrl(mysqlDBUrl)
                .setUsername(mysqlUser)
                .setPassword(mysqlPwd)
                .setTableName(mysqlTable)
                .build();

        JDBCLookupOptions jdbcLookupOptions = JDBCLookupOptions.builder()
                .setCacheExpireMs(10 * 1000) //缓存有效期
                .setCacheMaxSize(10) //最大缓存数据条数
                .setMaxRetryTimes(3) //最大重试次数
                .build();

        JDBCTableSource jdbcTableSource = JDBCTableSource.builder()
                .setOptions(jdbcOptions)
                .setLookupOptions(jdbcLookupOptions)
                .setSchema(mysqlTableSchema)
                .build();
        tableEnv.registerTableSource("mysql",jdbcTableSource);
        //注册TableFunction
        /**
         * getLookupFunction
         * 返回一个TableFunction。该Function可和LATERAL TABLE关键字结合使用，根据指定的key同步查找匹配的行
         */

        tableEnv.registerFunction("mysqlLookup",jdbcTableSource.getLookupFunction(new String[]{"pid"}));

        //5、查询
        String sql = "SELECT "
                + "       userID, "
                + "       eventTime, "
                + "       eventType, "
                + "       productID, "
                + "       productPrice, "
                + "       f1.name AS userName, "
                + "       f1.age AS userAge, "
                + "       productName, "
                + "       productCategory "
                + "FROM "
                + " kafka, "
                + " LATERAL TABLE (mysqlLookup(productID))";

        tableEnv.toAppendStream(tableEnv.sqlQuery(sql), Row.class).print();

        //6、开始执行
        tableEnv.execute(LookupTableSourceMain.class.getSimpleName());

    }

    /**
     * 解析Kafka数据
     */
    public static class BrowseKafkaProcessFunction extends ProcessFunction<String, UserBrowseLog> {
        @Override
        public void processElement(String value, Context ctx, Collector<UserBrowseLog> out) throws Exception {
            try {
                UserBrowseLog browseLog = JSON.parseObject(value, UserBrowseLog.class);
                // 增加一个long类型的时间戳
                // 指定eventTime为yyyy-MM-dd HH:mm:ss格式的北京时间
                DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                OffsetDateTime eventTime = LocalDateTime.parse(browseLog.getEventTime(), format).atOffset(ZoneOffset.of("+08:00"));

                // 转换成毫秒时间戳
                long eventTimeTimestamp = eventTime.toInstant().toEpochMilli();
                browseLog.setEventTimeTimestamp(eventTimeTimestamp);
                out.collect(browseLog);
            } catch (Exception e) {
                log.error("解析Kafka数据异常...", e);
            }
        }
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class UserBrowseLog {
        private String userID;
        private String eventTime;
        private long eventTimeTimestamp;
        private String eventType;
        private String productID;
        private int productPrice;
    }
}
